// Copyright (c) 2020-present,  INSPUR Co, Ltd.  All rights reserved.
// This source code is licensed under Apache 2.0 License.

#pragma once
#include <memory>
#include "abstract_memory_arena.h"
#include <pure_mem/uncoded_version_node.h>
#include <pure_mem/encoded_version_node.h>
#include "pure_mem/key_index/inline_uk_index.h"
#include "pure_mem/pmemrep.h"
#include "pure_mem/memoryblock/memory_arena/mem_iter.h"
#include "db/range_del_aggregator.h"

namespace rocksdb {

//liliupeng 内存存储区接口
typedef void* KeyHandle;
typedef InlineUserKeyIndex<const MemTableRep::KeyComparator &> ArtTree;

class MemArena : public IMemoryArena {
public:
    class KeyComparator : public MemTableRep::KeyComparator {
    public:
        const InternalKeyComparator comparator;
        explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { }
        int operator()(const char* prefix_len_key1,
                               const char* prefix_len_key2) const override;
        int operator()(const char* prefix_len_key,
                               const DecodedType& key) const override;
        int operator()(const Slice &key1, const Slice &key2,
                       SequenceNumber seq1, SequenceNumber seq2) const override ;
        int operator()(const Slice &key, SequenceNumber seq,
                       const char *prefix_len_key) const override ;
        int operator()(const Slice &key_a, SequenceNumber seq,
                       const Slice &key_b) const override;
};

public:
    explicit MemArena(const InternalKeyComparator& mkeyCmp, const ImmutableCFOptions& ioptions);
    ~MemArena() override;

    static const char *GenerateNode(bool encode_version_node, const Slice &key,
                                    const Slice &value, size_t *encoded_len);
    //范围删除插入,范围删除单独使用一个ART树
    bool RangeDeletionInsert(const Slice& key, const Slice& value) override ;
    void AnalysisNode(MvccKey& mk, ArtTree::Iterator * iter,
        uint64_t *iter_seq, ValueType *iter_t, Slice& end);

    //普通kv数据插入
    bool KVDataInsert(const Slice& key, const Slice& value);
    bool Insert(const Slice& key, const Slice& value) override ;

    //liliupeng 先在Hash中查询，查询不到则再进行ART查询
    //需要判断返回的是不是nullptr,如果是，说明当前key在范围删除内
    ArtTree::Iterator *Seek(const Slice& key) override ;
    ArtTree::Iterator *RangeDeletionSeek(const Slice& key);

    //L0存储的所有数据量
    size_t GetTotalMemVolume() const override {
      return kv_insert_num_.load();
    }

    ArtTree& GetKVList(){
      return kv_list_;
    }

    //内存存储区 生成InternalIterator一个迭代器
    InternalIterator* NewIterator(bool use_range_del_table) {
      if(use_range_del_table){
        return new MemoryArenaIter(&range_del_list_, moptions_.inplace_update_support,
            moptions_.encode_version_node);
      } else{
        return new MemoryArenaIter(&kv_list_, moptions_.inplace_update_support,
            moptions_.encode_version_node);
      }
    }

    //内存存储区 生成一个迭代器
    std::shared_ptr<MemoryArenaIter> NewMemIter(bool use_range_del_table) {
        if(use_range_del_table){
            return std::make_shared<MemoryArenaIter>(&range_del_list_,
                moptions_.inplace_update_support, moptions_.encode_version_node);
        } else{
            return std::make_shared<MemoryArenaIter>(&kv_list_,
                moptions_.inplace_update_support, moptions_.encode_version_node);
        }
    }

    //提供给NewRangeTombstoneIterator用
    std::unique_ptr<InternalIterator> NewRangeDelMemIter() {
      std::unique_ptr<InternalIterator> iter = make_unique<MemoryArenaIter>(&range_del_list_,
          moptions_.inplace_update_support, moptions_.encode_version_node);
      return iter;
    }

    uint64_t GetOldestKeyTime() const { return oldest_key_time_; }

    bool GetKVInsertNum() const { return kv_insert_num_.load() == 0; }

    const InternalKeyComparator& GetInternalKeyComparator() const {
        return mkey_cmp_.comparator;
    }

    void Ref();

    bool Unref();

    // Get the lock associated for the key
    port::RWMutex* GetLock(const Slice& key);

    FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator(SequenceNumber read_seq, const InternalKeyComparator& cmp){
      auto unfragmented_iter = this->NewRangeDelMemIter();
      if (unfragmented_iter == nullptr) {
        return nullptr;
      }

      auto fragmented_tombstone_list =
          std::make_shared<FragmentedRangeTombstoneList>(std::move(unfragmented_iter), cmp);

      auto* fragmented_iter = new FragmentedRangeTombstoneIterator(
          fragmented_tombstone_list, cmp, read_seq);
      return fragmented_iter;
    }


    Status AddRangeTombstoneIterators(const ReadOptions& read_opts,
                                      RangeDelAggregator* range_del_agg);

     bool Get(const LookupKey& key, std::string* value, Status* s,
              MergeContext* merge_context,
              SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
              const ReadOptions& read_opts, ReadCallback* callback = nullptr,
              bool* is_blob_index = nullptr);

     bool Get(const LookupKey& key, std::string* value, Status* s,
              MergeContext* merge_context,
              SequenceNumber* max_covering_tombstone_seq,
              const ReadOptions& read_opts, ReadCallback* callback = nullptr,
              bool* is_blob_index = nullptr) {
         SequenceNumber seq;
         return Get(key, value, s, merge_context, max_covering_tombstone_seq, &seq,
                    read_opts, callback, is_blob_index);
     }

   void Get(const LookupKey& k, void* callback_args,
                         bool (*callback_func)(void* arg, const char* entry, VersionNode* node)) {
       auto iter = NewMemIter(false);
       for (iter->Seek(k.internal_key());
            iter->Valid() && callback_func(callback_args, iter->Key(), iter->node());
            iter->Next()) {
       }
   }


private:
    KeyComparator mkey_cmp_;
    const ImmutableCFOptions& moptions_;
    ArtTree kv_list_{mkey_cmp_, ART_ROWEX_FULLKEY};
    ArtTree range_del_list_{mkey_cmp_, ART_ROWEX_FULLKEY};
    std::atomic<size_t> kv_insert_num_{0};
    uint64_t oldest_key_time_;
    int refs_;
    // rw locks for inplace updates
    std::vector<port::RWMutex> locks_;

};
}


